// Copyright (c) 2012, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

part of dart.async;

// -------------------------------------------------------------------
// Controller for creating and adding events to a stream.
// -------------------------------------------------------------------

/**
 * Type of a stream controller's `onListen`, `onPause` and `onResume` callbacks.
 */
typedef void ControllerCallback();

/**
 * Type of stream controller `onCancel` callbacks.
 *
 * The callback may return either `void` or a future.
 */
typedef ControllerCancelCallback();

/**
 * A controller with the stream it controls.
 *
 * This controller allows sending data, error and done events on
 * its [stream].
 * This class can be used to create a simple stream that others
 * can listen on, and to push events to that stream.
 *
 * It's possible to check whether the stream is paused or not, and whether
 * it has subscribers or not, as well as getting a callback when either of
 * these change.
 */
abstract class StreamController<T> implements StreamSink<T> {
  /** The stream that this controller is controlling. */
  Stream<T> get stream;

  /**
   * A controller with a [stream] that supports only one single subscriber.
   *
   * If [sync] is true, the returned stream controller is a
   * [SynchronousStreamController], and must be used with the care
   * and attention necessary to not break the [Stream] contract. If in doubt,
   * use the non-sync version.
   *
   * Using an asynchronous controller will never give the wrong
   * behavior, but using a synchronous controller incorrectly can cause
   * otherwise correct programs to break.
   *
   * A synchronous controller is only intended for optimizing event
   * propagation when one asynchronous event immediately triggers another.
   * It should not be used unless the calls to [add] or [addError]
   * are guaranteed to occur in places where it won't break `Stream` invariants.
   *
   * Use synchronous controllers only to forward (potentially transformed)
   * events from another stream or a future.
   *
   * A Stream should be inert until a subscriber starts listening on it (using
   * the [onListen] callback to start producing events). Streams should not
   * leak resources (like websockets) when no user ever listens on the stream.
   *
   * The controller buffers all incoming events until a subscriber is
   * registered, but this feature should only be used in rare circumstances.
   *
   * The [onPause] function is called when the stream becomes
   * paused. [onResume] is called when the stream resumed.
   *
   * The [onListen] callback is called when the stream
   * receives its listener and [onCancel] when the listener ends
   * its subscription. If [onCancel] needs to perform an asynchronous operation,
   * [onCancel] should return a future that completes when the cancel operation
   * is done.
   *
   * If the stream is canceled before the controller needs new data the
   * [onResume] call might not be executed.
   */
  factory StreamController(
      {void onListen(),
      void onPause(),
      void onResume(),
      onCancel(),
      bool sync: false}) {
    return sync
        ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
        : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
  }

  /**
   * A controller where [stream] can be listened to more than once.
   *
   * The [Stream] returned by [stream] is a broadcast stream.
   * It can be listened to more than once.
   *
   * A Stream should be inert until a subscriber starts listening on it (using
   * the [onListen] callback to start producing events). Streams should not
   * leak resources (like websockets) when no user ever listens on the stream.
   *
   * Broadcast streams do not buffer events when there is no listener.
   *
   * The controller distributes any events to all currently subscribed
   * listeners at the time when [add], [addError] or [close] is called.
   * It is not allowed to call `add`, `addError`, or `close` before a previous
   * call has returned. The controller does not have any internal queue of
   * events, and if there are no listeners at the time the event is added,
   * it will just be dropped, or, if it is an error, be reported as uncaught.
   *
   * Each listener subscription is handled independently,
   * and if one pauses, only the pausing listener is affected.
   * A paused listener will buffer events internally until unpaused or canceled.
   *
   * If [sync] is true, events may be fired directly by the stream's
   * subscriptions during an [add], [addError] or [close] call.
   * The returned stream controller is a [SynchronousStreamController],
   * and must be used with the care and attention necessary to not break
   * the [Stream] contract.
   * See [Completer.sync] for some explanations on when a synchronous
   * dispatching can be used.
   * If in doubt, keep the controller non-sync.
   *
   * If [sync] is false, the event will always be fired at a later time,
   * after the code adding the event has completed.
   * In that case, no guarantees are given with regard to when
   * multiple listeners get the events, except that each listener will get
   * all events in the correct order. Each subscription handles the events
   * individually.
   * If two events are sent on an async controller with two listeners,
   * one of the listeners may get both events
   * before the other listener gets any.
   * A listener must be subscribed both when the event is initiated
   * (that is, when [add] is called)
   * and when the event is later delivered,
   * in order to receive the event.
   *
   * The [onListen] callback is called when the first listener is subscribed,
   * and the [onCancel] is called when there are no longer any active listeners.
   * If a listener is added again later, after the [onCancel] was called,
   * the [onListen] will be called again.
   */
  factory StreamController.broadcast(
      {void onListen(), void onCancel(), bool sync: false}) {
    return sync
        ? new _SyncBroadcastStreamController<T>(onListen, onCancel)
        : new _AsyncBroadcastStreamController<T>(onListen, onCancel);
  }

  /**
   * The callback which is called when the stream is listened to.
   *
   * May be set to `null`, in which case no callback will happen.
   */
  ControllerCallback get onListen;

  void set onListen(void onListenHandler());

  /**
   * The callback which is called when the stream is paused.
   *
   * May be set to `null`, in which case no callback will happen.
   *
   * Pause related callbacks are not supported on broadcast stream controllers.
   */
  ControllerCallback get onPause;

  void set onPause(void onPauseHandler());

  /**
   * The callback which is called when the stream is resumed.
   *
   * May be set to `null`, in which case no callback will happen.
   *
   * Pause related callbacks are not supported on broadcast stream controllers.
   */
  ControllerCallback get onResume;

  void set onResume(void onResumeHandler());

  /**
   * The callback which is called when the stream is canceled.
   *
   * May be set to `null`, in which case no callback will happen.
   */
  ControllerCancelCallback get onCancel;

  void set onCancel(onCancelHandler());

  /**
   * Returns a view of this object that only exposes the [StreamSink] interface.
   */
  StreamSink<T> get sink;

  /**
   * Whether the stream controller is closed for adding more events.
   *
   * The controller becomes closed by calling the [close] method.
   * New events cannot be added, by calling [add] or [addError],
   * to a closed controller.
   *
   * If the controller is closed,
   * the "done" event might not have been delivered yet,
   * but it has been scheduled, and it is too late to add more events.
   */
  bool get isClosed;

  /**
   * Whether the subscription would need to buffer events.
   *
   * This is the case if the controller's stream has a listener and it is
   * paused, or if it has not received a listener yet. In that case, the
   * controller is considered paused as well.
   *
   * A broadcast stream controller is never considered paused. It always
   * forwards its events to all uncanceled subscriptions, if any,
   * and let the subscriptions handle their own pausing and buffering.
   */
  bool get isPaused;

  /** Whether there is a subscriber on the [Stream]. */
  bool get hasListener;

  /**
   * Sends a data [event].
   *
   * Listeners receive this event in a later microtask.
   *
   * Note that a synchronous controller (created by passing true to the `sync`
   * parameter of the `StreamController` constructor) delivers events
   * immediately. Since this behavior violates the contract mentioned here,
   * synchronous controllers should only be used as described in the
   * documentation to ensure that the delivered events always *appear* as if
   * they were delivered in a separate microtask.
   */
  void add(T event);

  /**
   * Sends or enqueues an error event.
   *
   * If [error] is `null`, it is replaced by a [NullThrownError].
   *
   * Listeners receive this event at a later microtask. This behavior can be
   * overridden by using `sync` controllers. Note, however, that sync
   * controllers have to satisfy the preconditions mentioned in the
   * documentation of the constructors.
   */
  void addError(Object error, [StackTrace stackTrace]);

  /**
   * Closes the stream.
   *
   * Listeners receive the done event at a later microtask. This behavior can be
   * overridden by using `sync` controllers. Note, however, that sync
   * controllers have to satisfy the preconditions mentioned in the
   * documentation of the constructors.
   */
  Future close();

  /**
   * Receives events from [source] and puts them into this controller's stream.
   *
   * Returns a future which completes when the source stream is done.
   *
   * Events must not be added directly to this controller using [add],
   * [addError], [close] or [addStream], until the returned future
   * is complete.
   *
   * Data and error events are forwarded to this controller's stream. A done
   * event on the source will end the `addStream` operation and complete the
   * returned future.
   *
   * If [cancelOnError] is true, only the first error on [source] is
   * forwarded to the controller's stream, and the `addStream` ends
   * after this. If [cancelOnError] is false, all errors are forwarded
   * and only a done event will end the `addStream`.
   * If [cancelOnError] is omitted, it defaults to false.
   */
  Future addStream(Stream<T> source, {bool cancelOnError});
}

/**
 * A stream controller that delivers its events synchronously.
 *
 * A synchronous stream controller is intended for cases where
 * an already asynchronous event triggers an event on a stream.
 *
 * Instead of adding the event to the stream in a later microtask,
 * causing extra latency, the event is instead fired immediately by the
 * synchronous stream controller, as if the stream event was
 * the current event or microtask.
 *
 * The synchronous stream controller can be used to break the contract
 * on [Stream], and it must be used carefully to avoid doing so.
 *
 * The only advantage to using a [SynchronousStreamController] over a
 * normal [StreamController] is the improved latency.
 * Only use the synchronous version if the improvement is significant,
 * and if its use is safe. Otherwise just use a normal stream controller,
 * which will always have the correct behavior for a [Stream], and won't
 * accidentally break other code.
 *
 * Adding events to a synchronous controller should only happen as the
 * very last part of the handling of the original event.
 * At that point, adding an event to the stream is equivalent to
 * returning to the event loop and adding the event in the next microtask.
 *
 * Each listener callback will be run as if it was a top-level event
 * or microtask. This means that if it throws, the error will be reported as
 * uncaught as soon as possible.
 * This is one reason to add the event as the last thing in the original event
 * handler - any action done after adding the event will delay the report of
 * errors in the event listener callbacks.
 *
 * If an event is added in a setting that isn't known to be another event,
 * it may cause the stream's listener to get that event before the listener
 * is ready to handle it. We promise that after calling [Stream.listen],
 * you won't get any events until the code doing the listen has completed.
 * Calling [add] in response to a function call of unknown origin may break
 * that promise.
 *
 * An [onListen] callback from the controller is *not* an asynchronous event,
 * and adding events to the controller in the `onListen` callback is always
 * wrong. The events will be delivered before the listener has even received
 * the subscription yet.
 *
 * The synchronous broadcast stream controller also has a restrictions that a
 * normal stream controller does not:
 * The [add], [addError], [close] and [addStream] methods *must not* be
 * called while an event is being delivered.
 * That is, if a callback on a subscription on the controller's stream causes
 * a call to any of the functions above, the call will fail.
 * A broadcast stream may have more than one listener, and if an
 * event is added synchronously while another is being also in the process
 * of being added, the latter event might reach some listeners before
 * the former. To prevent that, an event cannot be added while a previous
 * event is being fired.
 * This guarantees that an event is fully delivered when the
 * first [add], [addError] or [close] returns,
 * and further events will be delivered in the correct order.
 *
 * This still only guarantees that the event is delivered to the subscription.
 * If the subscription is paused, the actual callback may still happen later,
 * and the event will instead be buffered by the subscription.
 * Barring pausing, and the following buffered events that haven't been
 * delivered yet, callbacks will be called synchronously when an event is added.
 *
 * Adding an event to a synchronous non-broadcast stream controller while
 * another event is in progress may cause the second event to be delayed
 * and not be delivered synchronously, and until that event is delivered,
 * the controller will not act synchronously.
 */
abstract class SynchronousStreamController<T> implements StreamController<T> {
  /**
   * Adds event to the controller's stream.
   *
   * As [StreamController.add], but must not be called while an event is
   * being added by [add], [addError] or [close].
   */
  void add(T data);

  /**
   * Adds error to the controller's stream.
   *
   * As [StreamController.addError], but must not be called while an event is
   * being added by [add], [addError] or [close].
   */
  void addError(Object error, [StackTrace stackTrace]);

  /**
   * Closes the controller's stream.
   *
   * As [StreamController.close], but must not be called while an event is
   * being added by [add], [addError] or [close].
   */
  Future close();
}

abstract class _StreamControllerLifecycle<T> {
  StreamSubscription<T> _subscribe(
      void onData(T data), Function onError, void onDone(), bool cancelOnError);
  void _recordPause(StreamSubscription<T> subscription) {}
  void _recordResume(StreamSubscription<T> subscription) {}
  Future _recordCancel(StreamSubscription<T> subscription) => null;
}

// Base type for implementations of stream controllers.
abstract class _StreamControllerBase<T>
    implements
        StreamController<T>,
        _StreamControllerLifecycle<T>,
        _EventSink<T>,
        _EventDispatch<T> {}

/**
 * Default implementation of [StreamController].
 *
 * Controls a stream that only supports a single controller.
 */
abstract class _StreamController<T> implements _StreamControllerBase<T> {
  // The states are bit-flags. More than one can be set at a time.
  //
  // The "subscription state" goes through the states:
  //   initial -> subscribed -> canceled.
  // These are mutually exclusive.
  // The "closed" state records whether the [close] method has been called
  // on the controller. This can be done at any time. If done before
  // subscription, the done event is queued. If done after cancel, the done
  // event is ignored (just as any other event after a cancel).

  /** The controller is in its initial state with no subscription. */
  static const int _STATE_INITIAL = 0;
  /** The controller has a subscription, but hasn't been closed or canceled. */
  static const int _STATE_SUBSCRIBED = 1;
  /** The subscription is canceled. */
  static const int _STATE_CANCELED = 2;
  /** Mask for the subscription state. */
  static const int _STATE_SUBSCRIPTION_MASK = 3;

  // The following state relate to the controller, not the subscription.
  // If closed, adding more events is not allowed.
  // If executing an [addStream], new events are not allowed either, but will
  // be added by the stream.

  /**
   * The controller is closed due to calling [close].
   *
   * When the stream is closed, you can neither add new events nor add new
   * listeners.
   */
  static const int _STATE_CLOSED = 4;
  /**
   * The controller is in the middle of an [addStream] operation.
   *
   * While adding events from a stream, no new events can be added directly
   * on the controller.
   */
  static const int _STATE_ADDSTREAM = 8;

  /**
   * Field containing different data depending on the current subscription
   * state.
   *
   * If [_state] is [_STATE_INITIAL], the field may contain a [_PendingEvents]
   * for events added to the controller before a subscription.
   *
   * While [_state] is [_STATE_SUBSCRIBED], the field contains the subscription.
   *
   * When [_state] is [_STATE_CANCELED] the field is currently not used.
   */
  var _varData;

  /** Current state of the controller. */
  int _state = _STATE_INITIAL;

  /**
   * Future completed when the stream sends its last event.
   *
   * This is also the future returned by [close].
   */
  // TODO(lrn): Could this be stored in the varData field too, if it's not
  // accessed until the call to "close"? Then we need to special case if it's
  // accessed earlier, or if close is called before subscribing.
  _Future _doneFuture;

  ControllerCallback onListen;
  ControllerCallback onPause;
  ControllerCallback onResume;
  ControllerCancelCallback onCancel;

  _StreamController(this.onListen, this.onPause, this.onResume, this.onCancel);

  // Return a new stream every time. The streams are equal, but not identical.
  Stream<T> get stream => new _ControllerStream<T>(this);

  /**
   * Returns a view of this object that only exposes the [StreamSink] interface.
   */
  StreamSink<T> get sink => new _StreamSinkWrapper<T>(this);

  /**
   * Whether a listener has existed and been canceled.
   *
   * After this, adding more events will be ignored.
   */
  bool get _isCanceled => (_state & _STATE_CANCELED) != 0;

  /** Whether there is an active listener. */
  bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0;

  /** Whether there has not been a listener yet. */
  bool get _isInitialState =>
      (_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITIAL;

  bool get isClosed => (_state & _STATE_CLOSED) != 0;

  bool get isPaused =>
      hasListener ? _subscription._isInputPaused : !_isCanceled;

  bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0;

  /** New events may not be added after close, or during addStream. */
  bool get _mayAddEvent => (_state < _STATE_CLOSED);

  // Returns the pending events.
  // Pending events are events added before a subscription exists.
  // They are added to the subscription when it is created.
  // Pending events, if any, are kept in the _varData field until the
  // stream is listened to.
  // While adding a stream, pending events are moved into the
  // state object to allow the state object to use the _varData field.
  _PendingEvents<T> get _pendingEvents {
    assert(_isInitialState);
    if (!_isAddingStream) {
      return _varData;
    }
    _StreamControllerAddStreamState<T> state = _varData;
    return state.varData;
  }

  // Returns the pending events, and creates the object if necessary.
  _StreamImplEvents<T> _ensurePendingEvents() {
    assert(_isInitialState);
    if (!_isAddingStream) {
      _varData ??= new _StreamImplEvents<T>();
      return _varData;
    }
    _StreamControllerAddStreamState<T> state = _varData;
    if (state.varData == null) state.varData = new _StreamImplEvents<T>();
    return state.varData;
  }

  // Get the current subscription.
  // If we are adding a stream, the subscription is moved into the state
  // object to allow the state object to use the _varData field.
  _ControllerSubscription<T> get _subscription {
    assert(hasListener);
    if (_isAddingStream) {
      _StreamControllerAddStreamState<T> addState = _varData;
      return addState.varData;
    }
    return _varData;
  }

  /**
   * Creates an error describing why an event cannot be added.
   *
   * The reason, and therefore the error message, depends on the current state.
   */
  Error _badEventState() {
    if (isClosed) {
      return new StateError("Cannot add event after closing");
    }
    assert(_isAddingStream);
    return new StateError("Cannot add event while adding a stream");
  }

  // StreamSink interface.
  Future addStream(Stream<T> source, {bool cancelOnError}) {
    if (!_mayAddEvent) throw _badEventState();
    if (_isCanceled) return new _Future.immediate(null);
    _StreamControllerAddStreamState<T> addState =
        new _StreamControllerAddStreamState<T>(
            this, _varData, source, cancelOnError ?? false);
    _varData = addState;
    _state |= _STATE_ADDSTREAM;
    return addState.addStreamFuture;
  }

  /**
   * Returns a future that is completed when the stream is done
   * processing events.
   *
   * This happens either when the done event has been sent, or if the
   * subscriber of a single-subscription stream is cancelled.
   */
  Future get done => _ensureDoneFuture();

  Future _ensureDoneFuture() {
    _doneFuture ??= _isCanceled ? Future._nullFuture : new _Future();
    return _doneFuture;
  }

  /**
   * Send or enqueue a data event.
   */
  void add(T value) {
    if (!_mayAddEvent) throw _badEventState();
    _add(value);
  }

  /**
   * Send or enqueue an error event.
   */
  void addError(Object error, [StackTrace stackTrace]) {
    if (!_mayAddEvent) throw _badEventState();
    error = _nonNullError(error);
    AsyncError replacement = Zone.current.errorCallback(error, stackTrace);
    if (replacement != null) {
      error = _nonNullError(replacement.error);
      stackTrace = replacement.stackTrace;
    }
    _addError(error, stackTrace);
  }

  /**
   * Closes this controller and sends a done event on the stream.
   *
   * The first time a controller is closed, a "done" event is added to its
   * stream.
   *
   * You are allowed to close the controller more than once, but only the first
   * call has any effect.
   *
   * After closing, no further events may be added using [add], [addError]
   * or [addStream].
   *
   * The returned future is completed when the done event has been delivered.
   */
  Future close() {
    if (isClosed) {
      return _ensureDoneFuture();
    }
    if (!_mayAddEvent) throw _badEventState();
    _closeUnchecked();
    return _ensureDoneFuture();
  }

  void _closeUnchecked() {
    _state |= _STATE_CLOSED;
    if (hasListener) {
      _sendDone();
    } else if (_isInitialState) {
      _ensurePendingEvents().add(const _DelayedDone());
    }
  }

  // EventSink interface. Used by the [addStream] events.

  // Add data event, used both by the [addStream] events and by [add].
  void _add(T value) {
    if (hasListener) {
      _sendData(value);
    } else if (_isInitialState) {
      _ensurePendingEvents().add(new _DelayedData<T>(value));
    }
  }

  void _addError(Object error, StackTrace stackTrace) {
    if (hasListener) {
      _sendError(error, stackTrace);
    } else if (_isInitialState) {
      _ensurePendingEvents().add(new _DelayedError(error, stackTrace));
    }
  }

  void _close() {
    // End of addStream stream.
    assert(_isAddingStream);
    _StreamControllerAddStreamState<T> addState = _varData;
    _varData = addState.varData;
    _state &= ~_STATE_ADDSTREAM;
    addState.complete();
  }

  // _StreamControllerLifeCycle interface

  StreamSubscription<T> _subscribe(void onData(T data), Function onError,
      void onDone(), bool cancelOnError) {
    if (!_isInitialState) {
      throw new StateError("Stream has already been listened to.");
    }
    _ControllerSubscription<T> subscription = new _ControllerSubscription<T>(
        this, onData, onError, onDone, cancelOnError);

    _PendingEvents<T> pendingEvents = _pendingEvents;
    _state |= _STATE_SUBSCRIBED;
    if (_isAddingStream) {
      _StreamControllerAddStreamState<T> addState = _varData;
      addState.varData = subscription;
      addState.resume();
    } else {
      _varData = subscription;
    }
    subscription._setPendingEvents(pendingEvents);
    subscription._guardCallback(() {
      _runGuarded(onListen);
    });

    return subscription;
  }

  Future _recordCancel(StreamSubscription<T> subscription) {
    // When we cancel, we first cancel any stream being added,
    // Then we call `onCancel`, and finally the _doneFuture is completed.
    // If either of addStream's cancel or `onCancel` returns a future,
    // we wait for it before continuing.
    // Any error during this process ends up in the returned future.
    // If more errors happen, we act as if it happens inside nested try/finallys
    // or whenComplete calls, and only the last error ends up in the
    // returned future.
    Future result;
    if (_isAddingStream) {
      _StreamControllerAddStreamState<T> addState = _varData;
      result = addState.cancel();
    }
    _varData = null;
    _state =
        (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED;

    if (onCancel != null) {
      if (result == null) {
        // Only introduce a future if one is needed.
        // If _onCancel returns null, no future is needed.
        try {
          result = onCancel();
        } catch (e, s) {
          // Return the error in the returned future.
          // Complete it asynchronously, so there is time for a listener
          // to handle the error.
          result = new _Future().._asyncCompleteError(e, s);
        }
      } else {
        // Simpler case when we already know that we will return a future.
        result = result.whenComplete(onCancel);
      }
    }

    void complete() {
      if (_doneFuture != null && _doneFuture._mayComplete) {
        _doneFuture._asyncComplete(null);
      }
    }

    if (result != null) {
      result = result.whenComplete(complete);
    } else {
      complete();
    }

    return result;
  }

  void _recordPause(StreamSubscription<T> subscription) {
    if (_isAddingStream) {
      _StreamControllerAddStreamState<T> addState = _varData;
      addState.pause();
    }
    _runGuarded(onPause);
  }

  void _recordResume(StreamSubscription<T> subscription) {
    if (_isAddingStream) {
      _StreamControllerAddStreamState<T> addState = _varData;
      addState.resume();
    }
    _runGuarded(onResume);
  }
}

abstract class _SyncStreamControllerDispatch<T>
    implements _StreamController<T>, SynchronousStreamController<T> {
  int get _state;
  void set _state(int state);

  void _sendData(T data) {
    _subscription._add(data);
  }

  void _sendError(Object error, StackTrace stackTrace) {
    _subscription._addError(error, stackTrace);
  }

  void _sendDone() {
    _subscription._close();
  }
}

abstract class _AsyncStreamControllerDispatch<T>
    implements _StreamController<T> {
  void _sendData(T data) {
    _subscription._addPending(new _DelayedData<T>(data));
  }

  void _sendError(Object error, StackTrace stackTrace) {
    _subscription._addPending(new _DelayedError(error, stackTrace));
  }

  void _sendDone() {
    _subscription._addPending(const _DelayedDone());
  }
}

// TODO(lrn): Use common superclass for callback-controllers when VM supports
// constructors in mixin superclasses.

class _AsyncStreamController<T> = _StreamController<T>
    with _AsyncStreamControllerDispatch<T>;

class _SyncStreamController<T> = _StreamController<T>
    with _SyncStreamControllerDispatch<T>;

typedef _NotificationHandler();

void _runGuarded(_NotificationHandler notificationHandler) {
  if (notificationHandler == null) return;
  try {
    notificationHandler();
  } catch (e, s) {
    Zone.current.handleUncaughtError(e, s);
  }
}

class _ControllerStream<T> extends _StreamImpl<T> {
  _StreamControllerLifecycle<T> _controller;

  _ControllerStream(this._controller);

  StreamSubscription<T> _createSubscription(void onData(T data),
          Function onError, void onDone(), bool cancelOnError) =>
      _controller._subscribe(onData, onError, onDone, cancelOnError);

  // Override == and hashCode so that new streams returned by the same
  // controller are considered equal. The controller returns a new stream
  // each time it's queried, but doesn't have to cache the result.

  int get hashCode => _controller.hashCode ^ 0x35323532;

  bool operator ==(Object other) {
    if (identical(this, other)) return true;
    return other is _ControllerStream &&
        identical(other._controller, this._controller);
  }
}

class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> {
  final _StreamControllerLifecycle<T> _controller;

  _ControllerSubscription(this._controller, void onData(T data),
      Function onError, void onDone(), bool cancelOnError)
      : super(onData, onError, onDone, cancelOnError);

  Future _onCancel() {
    return _controller._recordCancel(this);
  }

  void _onPause() {
    _controller._recordPause(this);
  }

  void _onResume() {
    _controller._recordResume(this);
  }
}

/** A class that exposes only the [StreamSink] interface of an object. */
class _StreamSinkWrapper<T> implements StreamSink<T> {
  final StreamController _target;
  _StreamSinkWrapper(this._target);
  void add(T data) {
    _target.add(data);
  }

  void addError(Object error, [StackTrace stackTrace]) {
    _target.addError(error, stackTrace);
  }

  Future close() => _target.close();

  Future addStream(Stream<T> source) => _target.addStream(source);

  Future get done => _target.done;
}

/**
 * Object containing the state used to handle [StreamController.addStream].
 */
class _AddStreamState<T> {
  // [_Future] returned by call to addStream.
  final _Future addStreamFuture;

  // Subscription on stream argument to addStream.
  final StreamSubscription addSubscription;

  _AddStreamState(
      _EventSink<T> controller, Stream<T> source, bool cancelOnError)
      : addStreamFuture = new _Future(),
        addSubscription = source.listen(controller._add,
            onError: cancelOnError
                ? makeErrorHandler(controller)
                : controller._addError,
            onDone: controller._close,
            cancelOnError: cancelOnError);

  static makeErrorHandler(_EventSink controller) => (e, StackTrace s) {
        controller._addError(e, s);
        controller._close();
      };

  void pause() {
    addSubscription.pause();
  }

  void resume() {
    addSubscription.resume();
  }

  /**
   * Stop adding the stream.
   *
   * Complete the future returned by `StreamController.addStream` when
   * the cancel is complete.
   *
   * Return a future if the cancel takes time, otherwise return `null`.
   */
  Future cancel() {
    var cancel = addSubscription.cancel();
    if (cancel == null) {
      addStreamFuture._asyncComplete(null);
      return null;
    }
    return cancel.whenComplete(() {
      addStreamFuture._asyncComplete(null);
    });
  }

  void complete() {
    addStreamFuture._asyncComplete(null);
  }
}

class _StreamControllerAddStreamState<T> extends _AddStreamState<T> {
  // The subscription or pending data of a _StreamController.
  // Stored here because we reuse the `_varData` field  in the _StreamController
  // to store this state object.
  var varData;

  _StreamControllerAddStreamState(_StreamController<T> controller, this.varData,
      Stream<T> source, bool cancelOnError)
      : super(controller, source, cancelOnError) {
    if (controller.isPaused) {
      addSubscription.pause();
    }
  }
}
